home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_queue.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2005-10-18  |  6KB  |  247 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. import Queue
  5. import sys
  6. import threading
  7. import time
  8. from test.test_support import verify, TestFailed, verbose
  9. QUEUE_SIZE = 5
  10.  
  11. class _TriggerThread(threading.Thread):
  12.     
  13.     def __init__(self, fn, args):
  14.         self.fn = fn
  15.         self.args = args
  16.         self.startedEvent = threading.Event()
  17.         threading.Thread.__init__(self)
  18.  
  19.     
  20.     def run(self):
  21.         time.sleep(0.10000000000000001)
  22.         self.startedEvent.set()
  23.         self.fn(*self.args)
  24.  
  25.  
  26.  
  27. def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
  28.     t = _TriggerThread(trigger_func, trigger_args)
  29.     t.start()
  30.     result = block_func(*block_args)
  31.     if not t.startedEvent.isSet():
  32.         raise TestFailed("blocking function '%r' appeared not to block" % block_func)
  33.     
  34.     t.join(10)
  35.     if t.isAlive():
  36.         raise TestFailed("trigger function '%r' appeared to not return" % trigger_func)
  37.     
  38.     return result
  39.  
  40.  
  41. def _doExceptionalBlockingTest(block_func, block_args, trigger_func, trigger_args, expected_exception_class):
  42.     t = _TriggerThread(trigger_func, trigger_args)
  43.     t.start()
  44.     
  45.     try:
  46.         block_func(*block_args)
  47.     except expected_exception_class:
  48.         raise 
  49.     else:
  50.         raise TestFailed('expected exception of kind %r' % expected_exception_class)
  51.     finally:
  52.         t.join(10)
  53.         if t.isAlive():
  54.             raise TestFailed("trigger function '%r' appeared to not return" % trigger_func)
  55.         
  56.         if not t.startedEvent.isSet():
  57.             raise TestFailed('trigger thread ended but event never set')
  58.         
  59.  
  60.  
  61.  
  62. class FailingQueueException(Exception):
  63.     pass
  64.  
  65.  
  66. class FailingQueue(Queue.Queue):
  67.     
  68.     def __init__(self, *args):
  69.         self.fail_next_put = False
  70.         self.fail_next_get = False
  71.         Queue.Queue.__init__(self, *args)
  72.  
  73.     
  74.     def _put(self, item):
  75.         if self.fail_next_put:
  76.             self.fail_next_put = False
  77.             raise FailingQueueException, 'You Lose'
  78.         
  79.         return Queue.Queue._put(self, item)
  80.  
  81.     
  82.     def _get(self):
  83.         if self.fail_next_get:
  84.             self.fail_next_get = False
  85.             raise FailingQueueException, 'You Lose'
  86.         
  87.         return Queue.Queue._get(self)
  88.  
  89.  
  90.  
  91. def FailingQueueTest(q):
  92.     if not q.empty():
  93.         raise RuntimeError, 'Call this function with an empty queue'
  94.     
  95.     for i in range(QUEUE_SIZE - 1):
  96.         q.put(i)
  97.     
  98.     q.fail_next_put = True
  99.     
  100.     try:
  101.         q.put('oops', block = 0)
  102.         raise TestFailed("The queue didn't fail when it should have")
  103.     except FailingQueueException:
  104.         pass
  105.  
  106.     q.fail_next_put = True
  107.     
  108.     try:
  109.         q.put('oops', timeout = 0.10000000000000001)
  110.         raise TestFailed("The queue didn't fail when it should have")
  111.     except FailingQueueException:
  112.         pass
  113.  
  114.     q.put('last')
  115.     verify(q.full(), 'Queue should be full')
  116.     q.fail_next_put = True
  117.     
  118.     try:
  119.         _doBlockingTest(q.put, ('full',), q.get, ())
  120.         raise TestFailed("The queue didn't fail when it should have")
  121.     except FailingQueueException:
  122.         pass
  123.  
  124.     q.put('last')
  125.     q.fail_next_put = True
  126.     
  127.     try:
  128.         _doExceptionalBlockingTest(q.put, ('full', True, 10), q.get, (), FailingQueueException)
  129.         raise TestFailed("The queue didn't fail when it should have")
  130.     except FailingQueueException:
  131.         pass
  132.  
  133.     q.put('last')
  134.     verify(q.full(), 'Queue should be full')
  135.     q.get()
  136.     verify(not q.full(), 'Queue should not be full')
  137.     q.put('last')
  138.     verify(q.full(), 'Queue should be full')
  139.     _doBlockingTest(q.put, ('full',), q.get, ())
  140.     for i in range(QUEUE_SIZE):
  141.         q.get()
  142.     
  143.     verify(q.empty(), 'Queue should be empty')
  144.     q.put('first')
  145.     q.fail_next_get = True
  146.     
  147.     try:
  148.         q.get()
  149.         raise TestFailed("The queue didn't fail when it should have")
  150.     except FailingQueueException:
  151.         pass
  152.  
  153.     verify(not q.empty(), 'Queue should not be empty')
  154.     q.fail_next_get = True
  155.     
  156.     try:
  157.         q.get(timeout = 0.10000000000000001)
  158.         raise TestFailed("The queue didn't fail when it should have")
  159.     except FailingQueueException:
  160.         pass
  161.  
  162.     verify(not q.empty(), 'Queue should not be empty')
  163.     q.get()
  164.     verify(q.empty(), 'Queue should be empty')
  165.     q.fail_next_get = True
  166.     
  167.     try:
  168.         _doExceptionalBlockingTest(q.get, (), q.put, ('empty',), FailingQueueException)
  169.         raise TestFailed("The queue didn't fail when it should have")
  170.     except FailingQueueException:
  171.         pass
  172.  
  173.     verify(not q.empty(), 'Queue should not be empty')
  174.     q.get()
  175.     verify(q.empty(), 'Queue should be empty')
  176.  
  177.  
  178. def SimpleQueueTest(q):
  179.     if not q.empty():
  180.         raise RuntimeError, 'Call this function with an empty queue'
  181.     
  182.     q.put(111)
  183.     q.put(222)
  184.     if q.get() == 111:
  185.         pass
  186.     verify(q.get() == 222, "Didn't seem to queue the correct data!")
  187.     for i in range(QUEUE_SIZE - 1):
  188.         q.put(i)
  189.         verify(not q.empty(), 'Queue should not be empty')
  190.     
  191.     verify(not q.full(), 'Queue should not be full')
  192.     q.put('last')
  193.     verify(q.full(), 'Queue should be full')
  194.     
  195.     try:
  196.         q.put('full', block = 0)
  197.         raise TestFailed("Didn't appear to block with a full queue")
  198.     except Queue.Full:
  199.         pass
  200.  
  201.     
  202.     try:
  203.         q.put('full', timeout = 0.01)
  204.         raise TestFailed("Didn't appear to time-out with a full queue")
  205.     except Queue.Full:
  206.         pass
  207.  
  208.     _doBlockingTest(q.put, ('full',), q.get, ())
  209.     _doBlockingTest(q.put, ('full', True, 10), q.get, ())
  210.     for i in range(QUEUE_SIZE):
  211.         q.get()
  212.     
  213.     verify(q.empty(), 'Queue should be empty')
  214.     
  215.     try:
  216.         q.get(block = 0)
  217.         raise TestFailed("Didn't appear to block with an empty queue")
  218.     except Queue.Empty:
  219.         pass
  220.  
  221.     
  222.     try:
  223.         q.get(timeout = 0.01)
  224.         raise TestFailed("Didn't appear to time-out with an empty queue")
  225.     except Queue.Empty:
  226.         pass
  227.  
  228.     _doBlockingTest(q.get, (), q.put, ('empty',))
  229.     _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
  230.  
  231.  
  232. def test():
  233.     q = Queue.Queue(QUEUE_SIZE)
  234.     SimpleQueueTest(q)
  235.     SimpleQueueTest(q)
  236.     if verbose:
  237.         print 'Simple Queue tests seemed to work'
  238.     
  239.     q = FailingQueue(QUEUE_SIZE)
  240.     FailingQueueTest(q)
  241.     FailingQueueTest(q)
  242.     if verbose:
  243.         print 'Failing Queue tests seemed to work'
  244.     
  245.  
  246. test()
  247.